Conversation
…frame format with OCCURS DEPENDING ON clause.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughRefactors writer AST and RDD processing to support OCCURS and OCCURS DEPENDING ON (variable-size arrays) via dependee tracking; moves primitive field extraction to Changes
Sequence DiagramsequenceDiagram
participant User
participant Writer
participant Combiner
participant Builder
participant DepMap
participant Processor
User->>Writer: combine(rdd, copybook, schema)
Writer->>Combiner: constructWriterAst(copybook, schema)
Combiner->>DepMap: create empty dependeeMap
Combiner->>Builder: buildGroupField(..., dependeeMap)
Builder->>Builder: buildPrimitiveNode(..., dependeeMap)
alt field has DEPENDING ON
Builder->>DepMap: register DependingOnField(name -> spec)
Builder-->>Combiner: emit PrimitiveDependeeField / arrays with dependingOn
end
Combiner-->>Writer: return root WriterAst
Writer->>Processor: processRDD(..., variableSizeOccurs)
Processor->>Processor: iterate rows, call writeToBytes(ast,..., variableLengthOccurs)
rect rgba(100,200,100,0.5)
Processor->>DepMap: consult dependeeMap for dependee values
Processor->>Processor: compute dynamic OCCURS sizes, write arrays accordingly
end
Processor-->>Writer: RDD[Array[Byte]]
Writer-->>User: output RDD
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
JaCoCo code coverage report - 'cobol-parser'
|
JaCoCo code coverage report - 'spark-cobol'
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala (1)
119-145:⚠️ Potential issue | 🟠 MajorReject
variable_size_occursfor fixed-length writes.Now that
variable_size_occursis allowed for writing,NestedRecordCombiner.processRDD()can emit shorter byte arrays forrecord_format = F. Without RDW or any other framing, that turns the output into an undecodable flat file as soon as one record shrinks. Please fail this combination here, or keep fixed-length writes untrimmed.⚙️ Proposed validation
if (readerParameters.recordFormat != RecordFormat.FixedLength && readerParameters.recordFormat != RecordFormat.VariableLength) { issues += s"Only '${RecordFormat.FixedLength}' and '${RecordFormat.VariableLength}' values for 'record_format' are supported for writing, " + s"provided value: '${readerParameters.recordFormat}'" } + + if (readerParameters.variableSizeOccurs && + readerParameters.recordFormat != RecordFormat.VariableLength) { + issues += "'variable_size_occurs' is supported for writing only when 'record_format' is 'V'" + } if (readerParameters.occursMappings.nonEmpty) { issues += "OCCURS mapping option ('occurs_mappings') is not supported for writing" }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala` around lines 119 - 145, The validator must reject the combination of fixed-length output and variable-size OCCURS; update validateParametersForWriting (in CobolParametersValidator) to add a check that if readerParameters.recordFormat == RecordFormat.FixedLength and the variable-size-occurs option is enabled (readerParameters.variableSizeOccurs / variable_size_occurs flag on ReaderParameters), append an error like "variable_size_occurs is not supported for fixed-length writes" to issues so the IllegalArgumentException is thrown; this ensures fixed-length writes remain untrimmed and prevents emitting shorter byte arrays.
🧹 Nitpick comments (1)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala (1)
134-160: Add aVBrejection regression here too.This negative-path test currently proves
FBis rejected, but notVB. Since issue#797explicitly mentions VB, a dedicatedrecord_format = "VB"assertion would make the intended writer contract much clearer and protect it from regressions.Based on learnings, variable-block format writing will not be implemented in the writer; only VariableLength uses RDW headers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala` around lines 134 - 160, The test in VariableLengthEbcdicWriterSuite ("throw an exception on unexpected output record format") only asserts that "FB" is rejected but misses asserting that "VB" is rejected; add a second negative-path check (or extend the existing intercept) that writes with .option("record_format", "VB") using the same df/path setup and asserts the thrown IllegalArgumentException message includes the same "Only 'F' and 'V' values for 'record_format' are supported for writing, provided value: 'VB'" text so VB regressions are caught (refer to the test block variables df, path and the intercept/exception handling used in the current test).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 418-426: The null-array branches in NestedRecordCombiner are
leaving DEPENDING ON fields as raw zeros in fixed-length mode because
Copybook.setPrimitiveField is only invoked when variableLengthOccurs is true;
update both null branches (around the blocks that now return 0 and
cobolField.binaryProperties.actualSize, also the similar case at lines 459-466)
to always call Copybook.setPrimitiveField(spec.cobolField, ar, 0,
fieldStartOffsetOverride = spec.baseOffset) for each spec in dependingOn before
returning, so the dependee PrimitiveDependeeField gets an encoded COBOL zero
even when the array is null and variableLengthOccurs is false.
---
Outside diff comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`:
- Around line 119-145: The validator must reject the combination of fixed-length
output and variable-size OCCURS; update validateParametersForWriting (in
CobolParametersValidator) to add a check that if readerParameters.recordFormat
== RecordFormat.FixedLength and the variable-size-occurs option is enabled
(readerParameters.variableSizeOccurs / variable_size_occurs flag on
ReaderParameters), append an error like "variable_size_occurs is not supported
for fixed-length writes" to issues so the IllegalArgumentException is thrown;
this ensures fixed-length writes remain untrimmed and prevents emitting shorter
byte arrays.
---
Nitpick comments:
In
`@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala`:
- Around line 134-160: The test in VariableLengthEbcdicWriterSuite ("throw an
exception on unexpected output record format") only asserts that "FB" is
rejected but misses asserting that "VB" is rejected; add a second negative-path
check (or extend the existing intercept) that writes with
.option("record_format", "VB") using the same df/path setup and asserts the
thrown IllegalArgumentException message includes the same "Only 'F' and 'V'
values for 'record_format' are supported for writing, provided value: 'VB'" text
so VB regressions are caught (refer to the test block variables df, path and the
intercept/exception handling used in the current test).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b88d503c-8012-4362-bbf1-6c0ad43aa41d
📒 Files selected for processing (8)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scalacobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Statement.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/DependingOnField.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/WriterAst.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/VariableLengthEbcdicWriterSuite.scala
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala (1)
370-373: Mutable state in shared AST node is safe but fragile.
DependingOnField.baseOffsetis mutated at line 372 for every row within the same partition. This works correctly because:
- Rows are processed sequentially within
mapPartitions- Each row gets a fresh byte array
baseOffsetis always updated before being read in the subsequentdependingOn.foreachcallsHowever, this design is fragile—if the processing model ever changes to concurrent row handling, this will silently corrupt data. Consider adding a brief comment explaining why this mutation is safe.
📝 Suggested documentation
// ── Primitive which has an OCCURS DEPENDS ON ───────────────────────────── case PrimitiveDependeeField(spec) => + // NOTE: baseOffset is mutated here for each row. This is safe because rows + // are processed sequentially within mapPartitions, and the offset is always + // updated before being read in subsequent array-element writes. spec.baseOffset = currentOffset spec.cobolField.binaryProperties.actualSize🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala` around lines 370 - 373, The mutation of DependingOnField.baseOffset inside the PrimitiveDependeeField match is fragile; add a concise comment near the assignment (spec.baseOffset = currentOffset) explaining that this is safe because rows are processed sequentially within mapPartitions, each row uses a fresh byte array, and baseOffset is always updated before any dependingOn.foreach reads it, and note that concurrent processing would break this invariant; reference PrimitiveDependeeField, DependingOnField.baseOffset, mapPartitions, and dependingOn.foreach in the comment so future maintainers understand the assumption.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Line 281: Fix the comment typo inside NestedRecordCombiner.scala near the
comment above the dependee handling: change "defines" to "defined" in the
comment "// Dependee fields need not to be defines in Spark schema." so it reads
"// Dependee fields need not to be defined in Spark schema." to correct the
spelling in the NestedRecordCombiner class.
---
Nitpick comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 370-373: The mutation of DependingOnField.baseOffset inside the
PrimitiveDependeeField match is fragile; add a concise comment near the
assignment (spec.baseOffset = currentOffset) explaining that this is safe
because rows are processed sequentially within mapPartitions, each row uses a
fresh byte array, and baseOffset is always updated before any
dependingOn.foreach reads it, and note that concurrent processing would break
this invariant; reference PrimitiveDependeeField, DependingOnField.baseOffset,
mapPartitions, and dependingOn.foreach in the comment so future maintainers
understand the assumption.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7883dcea-55d1-4916-8681-7c58385e6b5c
📒 Files selected for processing (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidatorSuite.scala
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala (1)
467-476: Reduce coupling to AST layout and exact exception wording.This test depends on
children(5)and the full message with parser line numbers, so harmless AST or wording refactors will break it even if duplicate-dependee validation still works. Prefer selecting the secondCNT1by name and asserting only the stable part of the error.♻️ Suggested cleanup
- val ast = parsedCopybook.ast - val children = ast.children.head.asInstanceOf[Group].children - val cnt2 = children(5).asInstanceOf[Primitive].withUpdatedIsDependee(true) - children(5) = cnt2 + val record = parsedCopybook.ast.children.head.asInstanceOf[Group] + val cnt1Indexes = record.children.zipWithIndex.collect { + case (p: Primitive, idx) if p.name.equalsIgnoreCase("CNT1") => idx + } + assert(cnt1Indexes.size == 2) + val secondCnt1Index = cnt1Indexes(1) + record.children(secondCnt1Index) = + record.children(secondCnt1Index).asInstanceOf[Primitive].withUpdatedIsDependee(true) val ex = intercept[IllegalArgumentException] { NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema) } - assert(ex.getMessage == "Duplicate field name 'CNT1' found in copybook. Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used. Already found a dependee field with the same name at line 4, current field line number: 10.") + assert(ex.getMessage.contains("Duplicate field name 'CNT1' found in copybook.")) + assert(ex.getMessage.contains("Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used."))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala` around lines 467 - 476, The test in NestedWriterSuite is brittle because it relies on AST index children(5) and exact exception text; update the test so it finds the second CNT1 node by name (e.g., walk parsedCopybook.ast to locate the Primitive node whose name equals "CNT1" and then call withUpdatedIsDependee on that node) instead of using children(5), and change the assertion on the thrown IllegalArgumentException from exact string equality to a contains check for the stable fragment (for example verify ex.getMessage contains "Duplicate field name 'CNT1'") while keeping the call to NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`:
- Around line 153-156: The validator currently only logs a warning when
readerParameters.variableSizeOccurs is true with readerParameters.recordFormat
== RecordFormat.FixedLength; change this to reject the configuration by
throwing/returning a validation error in CobolParametersValidator (instead of
log.warn) when variableSizeOccurs is set with RecordFormat.FixedLength,
referencing the same flags (variableSizeOccurs and recordFormat) and
RecordFormat.FixedLength to locate the check; include a clear error message
stating that variable-size occurs cannot be used with fixed-length writes
because RDW framing is only added for RecordFormat.VariableLength (see
NestedRecordCombiner handling), so validation must fail.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala`:
- Around line 42-47: The code currently calls
ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH) and get(PARAM_SOURCE_PATH)
which throw NoSuchElementException before
CobolParametersValidator.validateOrThrow runs; change the parameter extraction
in CobolStreamer: replace direct get(...) calls with getOption(...) and supply a
safe default (e.g., .getOrElse("") ) when building the parameters Map for
PARAM_COPYBOOK_PATH and PARAM_SOURCE_PATH so that
CobolParametersValidator.validateOrThrow(parameters,
ssc.sparkContext.hadoopConfiguration) can run and produce its intended
validation errors instead of a raw exception.
---
Nitpick comments:
In
`@spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala`:
- Around line 467-476: The test in NestedWriterSuite is brittle because it
relies on AST index children(5) and exact exception text; update the test so it
finds the second CNT1 node by name (e.g., walk parsedCopybook.ast to locate the
Primitive node whose name equals "CNT1" and then call withUpdatedIsDependee on
that node) instead of using children(5), and change the assertion on the thrown
IllegalArgumentException from exact string equality to a contains check for the
stable fragment (for example verify ex.getMessage contains "Duplicate field name
'CNT1'") while keeping the call to
NestedRecordCombiner.constructWriterAst(parsedCopybook, df.schema).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 889a3459-b227-4f39-aa31-c97cb5aed499
📒 Files selected for processing (3)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scalaspark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala
| if (readerParameters.variableSizeOccurs && | ||
| readerParameters.recordFormat == RecordFormat.FixedLength) { | ||
| log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.") | ||
| } |
There was a problem hiding this comment.
Reject variable_size_occurs for fixed-length writes.
This combination can emit variable-sized records while spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala:45-76 only adds RDW framing for RecordFormat.VariableLength. With record_format=F, downstream consumers lose unambiguous record boundaries, so this should fail validation instead of warning.
🛑 Suggested validation change
- if (readerParameters.variableSizeOccurs &&
- readerParameters.recordFormat == RecordFormat.FixedLength) {
- log.warn("Option 'variable_size_occurs=true' is used with 'record_format=F' which means records can have variable length. It is highly recommended to use 'record_format=V' instead.")
- }
+ if (readerParameters.variableSizeOccurs &&
+ readerParameters.recordFormat == RecordFormat.FixedLength) {
+ issues += "Option 'variable_size_occurs=true' requires 'record_format=V' for writing"
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala`
around lines 153 - 156, The validator currently only logs a warning when
readerParameters.variableSizeOccurs is true with readerParameters.recordFormat
== RecordFormat.FixedLength; change this to reject the configuration by
throwing/returning a validation error in CobolParametersValidator (instead of
log.warn) when variableSizeOccurs is set with RecordFormat.FixedLength,
referencing the same flags (variableSizeOccurs and recordFormat) and
RecordFormat.FixedLength to locate the check; include a clear error message
stating that variable-size occurs cannot be used with fixed-length writes
because RDW framing is only added for RecordFormat.VariableLength (see
NestedRecordCombiner handling), so validation must fail.
| val parameters: Map[String, String] = Map[String, String]( | ||
| PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH), | ||
| PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH) | ||
| ) | ||
|
|
||
| CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Apache Spark's Scala API, what does SparkConf.get(key)do when the key is missing, and how does that differ fromSparkConf.getOption(key)?
💡 Result:
SparkConf.get(key)returns the String value if set, but if the key is missing it throwsjava.util.NoSuchElementException(key). [1]SparkConf.getOption(key)returns anOption[String]:Some(value)if present, otherwiseNone(no exception). [1]
Sources:
[1] Apache Spark source (SparkConf.scala, v3.5.6) (github.com)
Citations:
🏁 Script executed:
# Locate and inspect CobolStreamer.scala around lines 42-47
fd "CobolStreamer.scala" --type fRepository: AbsaOSS/cobrix
Length of output: 1855
🏁 Script executed:
# Inspect CobolParametersValidator to understand error message design
fd "CobolParametersValidator.scala" --type fRepository: AbsaOSS/cobrix
Length of output: 163
🏁 Script executed:
# Find CobolStreamer.scala
fd "CobolStreamer.scala" --path "*/spark-cobol/*"Repository: AbsaOSS/cobrix
Length of output: 273
🏁 Script executed:
# Read CobolStreamer.scala around lines 42-47
find . -path "*/spark-cobol/*" -name "CobolStreamer.scala" -exec cat -n {} \; | sed -n '35,55p'Repository: AbsaOSS/cobrix
Length of output: 1236
🏁 Script executed:
# Read CobolParametersValidator to understand error messages
cat -n spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala | head -150Repository: AbsaOSS/cobrix
Length of output: 8759
Don't call SparkConf.get() before validation.
These lookups throw NoSuchElementException before CobolParametersValidator.validateOrThrow() runs (line 47). Users see a raw config exception instead of the validator's intended error messages for missing required parameters.
Suggested fix
implicit class Deserializer(`@transient` val ssc: StreamingContext) extends Serializable {
- val parameters: Map[String, String] = Map[String, String](
- PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH),
- PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH)
- )
+ private val conf = ssc.sparkContext.getConf
+ val parameters: Map[String, String] =
+ Seq(PARAM_COPYBOOK_PATH, PARAM_SOURCE_PATH)
+ .flatMap(key => conf.getOption(key).map(value => key -> value))
+ .toMap
CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val parameters: Map[String, String] = Map[String, String]( | |
| PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH), | |
| PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH) | |
| ) | |
| CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) | |
| private val conf = ssc.sparkContext.getConf | |
| val parameters: Map[String, String] = | |
| Seq(PARAM_COPYBOOK_PATH, PARAM_SOURCE_PATH) | |
| .flatMap(key => conf.getOption(key).map(value => key -> value)) | |
| .toMap | |
| CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala`
around lines 42 - 47, The code currently calls
ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH) and get(PARAM_SOURCE_PATH)
which throw NoSuchElementException before
CobolParametersValidator.validateOrThrow runs; change the parameter extraction
in CobolStreamer: replace direct get(...) calls with getOption(...) and supply a
safe default (e.g., .getOrElse("") ) when building the parameters Map for
PARAM_COPYBOOK_PATH and PARAM_SOURCE_PATH so that
CobolParametersValidator.validateOrThrow(parameters,
ssc.sparkContext.hadoopConfiguration) can run and produce its intended
validation errors instead of a raw exception.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala (1)
59-75:⚠️ Potential issue | 🟠 MajorValidate the RDW length from the written row, not the copybook maximum.
This pre-check uses
cobolSchema.getRecordSize, so big-endian RDW rejects any copybook whose maximum size exceeds 65535 even whenvariableSizeOccursmakes each emitted row much smaller. The real length is only known at Line 165, and that path currently packs the low bytes without a per-row bounds check.🧩 Suggested fix
- val recordLengthLong = cobolSchema.getRecordSize.toLong + adjustment1.toLong + adjustment2.toLong - if (recordLengthLong < 0) { - throw new IllegalArgumentException( - s"Invalid RDW length $recordLengthLong. Check 'is_rdw_part_of_record_length' and 'rdw_adjustment'." - ) - } - if (isRdwBigEndian && recordLengthLong > 0xFFFFL) { - throw new IllegalArgumentException( - s"RDW length $recordLengthLong exceeds 65535 and cannot be encoded in big-endian mode." - ) - } - if (!isRdwBigEndian && recordLengthLong > Int.MaxValue.toLong) { - throw new IllegalArgumentException( - s"RDW length $recordLengthLong exceeds ${Int.MaxValue} and cannot be encoded safely." - ) - } + if (!readerParameters.variableSizeOccurs || !hasRdw) { + val maxRecordLengthLong = cobolSchema.getRecordSize.toLong + adjustment1.toLong + adjustment2.toLong + if (maxRecordLengthLong < 0) { + throw new IllegalArgumentException( + s"Invalid RDW length $maxRecordLengthLong. Check 'is_rdw_part_of_record_length' and 'rdw_adjustment'." + ) + } + if (isRdwBigEndian && maxRecordLengthLong > 0xFFFFL) { + throw new IllegalArgumentException( + s"RDW length $maxRecordLengthLong exceeds 65535 and cannot be encoded in big-endian mode." + ) + } + if (!isRdwBigEndian && maxRecordLengthLong > Int.MaxValue.toLong) { + throw new IllegalArgumentException( + s"RDW length $maxRecordLengthLong exceeds ${Int.MaxValue} and cannot be encoded safely." + ) + } + }- val recordLengthToWriteToRDW = bytesWritten + recordLengthAdj + val recordLengthToWriteToRDW = bytesWritten.toLong + recordLengthAdj.toLong + if (recordLengthToWriteToRDW < 0) { + throw new IllegalArgumentException( + s"Invalid RDW length $recordLengthToWriteToRDW for row. Check 'is_rdw_part_of_record_length' and 'rdw_adjustment'." + ) + } + if (isRdwBigEndian && recordLengthToWriteToRDW > 0xFFFFL) { + throw new IllegalArgumentException( + s"RDW length $recordLengthToWriteToRDW exceeds 65535 and cannot be encoded in big-endian mode." + ) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala` around lines 59 - 75, The current pre-check computes recordLengthLong from cobolSchema.getRecordSize and rejects copybooks whose maximum size exceeds limits; instead, remove or bypass these global checks in NestedRecordCombiner and perform per-row RDW length validation inside processRDD where the actual written row length is known (the code path around the RDW packing at line ~165). Specifically, stop using cobolSchema.getRecordSize for big-endian/int bounds when readerParameters.variableSizeOccurs may make rows smaller, and add checks that compute the RDW length from the actual row bytes before encoding (validate >=0, <=0xFFFF for isRdwBigEndian, <=Int.MaxValue otherwise) and throw IllegalArgumentException with clear messages if a per-row length is out of range; keep processRDD, cobolSchema.copybook, recordLengthLong, isRdwBigEndian, variableSizeOccurs names to locate the changes.
🧹 Nitpick comments (1)
spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala (1)
147-187: Please add a regression forrecord_format = Vwithvariable_size_occurs = true.The existing RDW coverage in
spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala:133-156keepsvariable_size_occursat its default, so this new path is still unpinned. A case where the copybook max size is large but the row is short would lock down both the RDW length calculation and the finalcopyOf()behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala` around lines 147 - 187, Add a regression test in NestedWriterSuite that covers record_format = "V" with variable_size_occurs = true: construct a copybook with a large max record size but produce a short Row so processRDD/writeToBytes takes the path where hasRdw=true and variableSizeOccurs=true; verify the RDW header bytes (both big-endian and little-endian cases if supported) reflect bytesWritten + recordLengthAdj and assert the output array length equals bytesWritten + startOffset (i.e., the trimmed copy produced by java.util.Arrays.copyOf), thereby locking down RDW length calculation and the copyOf behavior referenced by writeToBytes/processRDD.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 183-187: The current logic in NestedRecordCombiner uses
java.util.Arrays.copyOf whenever variableSizeOccurs is true, which truncates the
buffer even when hasRdw is false and removes record framing; change the
condition so copyOf is only used when variableSizeOccurs is true AND hasRdw is
true; if hasRdw is false, return the original full buffer (ar) regardless of
variableSizeOccurs, or alternatively validate earlier and throw/reject when
variable_size_occurs is set but record_format is not V. Update the branch that
checks variableSizeOccurs/recordSize/bytesWritten/startOffset to reference
hasRdw before calling copyOf.
---
Outside diff comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 59-75: The current pre-check computes recordLengthLong from
cobolSchema.getRecordSize and rejects copybooks whose maximum size exceeds
limits; instead, remove or bypass these global checks in NestedRecordCombiner
and perform per-row RDW length validation inside processRDD where the actual
written row length is known (the code path around the RDW packing at line ~165).
Specifically, stop using cobolSchema.getRecordSize for big-endian/int bounds
when readerParameters.variableSizeOccurs may make rows smaller, and add checks
that compute the RDW length from the actual row bytes before encoding (validate
>=0, <=0xFFFF for isRdwBigEndian, <=Int.MaxValue otherwise) and throw
IllegalArgumentException with clear messages if a per-row length is out of
range; keep processRDD, cobolSchema.copybook, recordLengthLong, isRdwBigEndian,
variableSizeOccurs names to locate the changes.
---
Nitpick comments:
In
`@spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala`:
- Around line 147-187: Add a regression test in NestedWriterSuite that covers
record_format = "V" with variable_size_occurs = true: construct a copybook with
a large max record size but produce a short Row so processRDD/writeToBytes takes
the path where hasRdw=true and variableSizeOccurs=true; verify the RDW header
bytes (both big-endian and little-endian cases if supported) reflect
bytesWritten + recordLengthAdj and assert the output array length equals
bytesWritten + startOffset (i.e., the trimmed copy produced by
java.util.Arrays.copyOf), thereby locking down RDW length calculation and the
copyOf behavior referenced by writeToBytes/processRDD.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8862b6e6-407a-408d-b37f-05c4868108c9
📒 Files selected for processing (7)
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/Copybook.scalacobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scalacobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scalacobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scalacobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scalacobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/extract/BinaryExtractorSpec.scalaspark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/writer/NestedRecordCombiner.scala
Closes #797
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores